require_relative '../common'
require 'net/ssh/transport/session'
require 'net/ssh/proxy/http'
require 'logger'

# mocha adds #verify to Object, which throws off the host-key-verifier part of
# these tests.

# can't use .include? because ruby18 uses strings and ruby19 uses symbols :/
Object.send(:undef_method, :verify) if Object.instance_methods.any? { |v| v.to_sym == :verify }

module Transport
  class TestSession < NetSSHTest
    include Net::SSH::Transport::Constants

    TEST_HOST = "net.ssh.test"
    TEST_PORT = 22

    def test_constructor_defaults
      assert_equal TEST_HOST, session.host
      assert_equal TEST_PORT, session.port
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNewOrLocalTunnel,
        session.host_key_verifier
      )
    end

    def test_verify_host_key_true_uses_accept_new_or_local_tunnel_verifier
      Kernel.expects(:warn).with(
        'verify_host_key: true is deprecated, use :accept_new_or_local_tunnel'
      )
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNewOrLocalTunnel,
        session(verify_host_key: true).host_key_verifier
      )
    end

    def test_verify_host_key_accept_new_or_local_tunnel_uses_accept_new_or_local_tunnel_verifier
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNewOrLocalTunnel,
        session(verify_host_key: :accept_new_or_local_tunnel).host_key_verifier
      )
    end

    def test_verify_host_key_nil_uses_accept_new_or_local_tunnel_verifier
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNewOrLocalTunnel,
        session(verify_host_key: nil).host_key_verifier
      )
    end

    def test_verify_host_key_very_uses_accept_new_verifier
      Kernel.expects(:warn).with('verify_host_key: :very is deprecated, use :accept_new')
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNew,
        session(verify_host_key: :very).host_key_verifier
      )
    end

    def test_verify_host_key_accept_new_uses_accept_new_verifier
      assert_instance_of(
        Net::SSH::Verifiers::AcceptNew,
        session(verify_host_key: :accept_new).host_key_verifier
      )
    end

    def test_verify_host_key_secure_uses_always_verifier
      Kernel.expects(:warn).with('verify_host_key: :secure is deprecated, use :always')
      assert_instance_of(
        Net::SSH::Verifiers::Always,
        session(verify_host_key: :secure).host_key_verifier
      )
    end

    def test_verify_host_key_false_uses_never_verifier
      Kernel.expects(:warn).with('verify_host_key: false is deprecated, use :never')
      assert_instance_of(
        Net::SSH::Verifiers::Never,
        session(verify_host_key: false).host_key_verifier
      )
    end

    def test_verify_host_key_null_uses_never_verifier
      assert_instance_of(
        Net::SSH::Verifiers::Never,
        session(verify_host_key: :never).host_key_verifier
      )
    end

    def test_unknown_verify_host_key_value_raises_exception_if_value_does_not_respond_to_verify
      assert_raises(ArgumentError) { session(verify_host_key: :bogus).host_key_verifier }
    end

    def test_verify_host_key_value_responding_to_verify_should_pass_muster
      object = stub("thingy", verify: true, verify_signature: true)
      assert_equal object, session(verify_host_key: object).host_key_verifier
    end

    def test_deprecated_host_key_verifier
      Kernel.expects(:warn).with('Warning: verifier without :verify_signature is deprecated')

      object = stub("thingy", verify: true)
      assert_not_nil session(verify_host_key: object).host_key_verifier
    end

    def test_host_as_string_should_return_host_and_ip_when_port_is_default
      session!
      socket.stubs(:peer_ip).returns("1.2.3.4")
      assert_equal "#{TEST_HOST},1.2.3.4", session.host_as_string
    end

    def test_host_as_string_should_return_host_and_ip_with_port_when_port_is_not_default
      session(port: 1234) # force session to be instantiated
      socket.stubs(:peer_ip).returns("1.2.3.4")
      assert_equal "[#{TEST_HOST}]:1234,[1.2.3.4]:1234", session.host_as_string
    end

    def test_host_as_string_should_return_only_host_when_host_is_ip
      session!(host: "1.2.3.4")
      socket.stubs(:peer_ip).returns("1.2.3.4")
      assert_equal "1.2.3.4", session.host_as_string
    end

    def test_host_as_string_should_return_only_host_and_port_when_host_is_ip_and_port_is_not_default
      session!(host: "1.2.3.4", port: 1234)
      socket.stubs(:peer_ip).returns("1.2.3.4")
      assert_equal "[1.2.3.4]:1234", session.host_as_string
    end

    def test_host_as_string_should_return_only_host_when_proxy_command_is_set
      session!(host: "1.2.3.4")
      socket.stubs(:peer_ip).returns(Net::SSH::Transport::PacketStream::PROXY_COMMAND_HOST_IP)
      assert_equal "1.2.3.4", session.host_as_string
    end

    def test_host_as_string_should_return_only_host_and_port_when_host_is_ip_and_port_is_not_default_and_proxy_command_is_set
      session!(host: "1.2.3.4", port: 1234)
      socket.stubs(:peer_ip).returns(Net::SSH::Transport::PacketStream::PROXY_COMMAND_HOST_IP)
      assert_equal "[1.2.3.4]:1234", session.host_as_string
    end

    def test_close_should_cleanup_and_close_socket
      session!
      socket.expects(:cleanup)
      socket.expects(:close)
      session.close
    end

    def test_service_request_should_return_buffer
      assert_equal "\005\000\000\000\004sftp", session.service_request('sftp').to_s
    end

    def test_rekey_when_kex_is_pending_should_do_nothing
      algorithms.stubs(pending?: true)
      algorithms.expects(:rekey!).never
      session.rekey!
    end

    def test_rekey_when_no_kex_is_pending_should_initiate_rekey_and_block_until_it_completes
      algorithms.stubs(pending?: false)
      algorithms.expects(:rekey!)
      session.expects(:wait).yields
      algorithms.expects(:initialized?).returns(true)
      session.rekey!
    end

    def test_rekey_as_needed_when_kex_is_pending_should_do_nothing
      session!
      algorithms.stubs(pending?: true)
      socket.expects(:if_needs_rekey?).never
      session.rekey_as_needed
    end

    def test_rekey_as_needed_when_no_kex_is_pending_and_no_rekey_is_needed_should_do_nothing
      session!
      algorithms.stubs(pending?: false)
      socket.stubs(if_needs_rekey?: false)
      session.expects(:rekey!).never
      session.rekey_as_needed
    end

    def test_rekey_as_needed_when_no_kex_is_pending_and_rekey_is_needed_should_initiate_rekey_and_block
      session!
      algorithms.stubs(pending?: false)
      socket.expects(:if_needs_rekey?).yields
      session.expects(:rekey!)
      session.rekey_as_needed
    end

    def test_peer_should_return_hash_of_info_about_peer
      session!
      socket.stubs(peer_ip: "1.2.3.4")
      assert_equal({ ip: "1.2.3.4", port: TEST_PORT, host: TEST_HOST, canonized: "net.ssh.test,1.2.3.4" }, session.peer)
    end

    def test_next_message_should_block_until_next_message_is_available
      session.expects(:poll_message).with(:block)
      session.next_message
    end

    def test_poll_message_should_query_next_packet_using_the_given_blocking_parameter
      session!
      socket.expects(:next_packet).with(:blocking_parameter, nil).returns(nil)
      session.poll_message(:blocking_parameter)
    end

    def test_poll_message_should_query_next_packet_using_the_timeout_option
      session!(timeout: 7)
      socket.expects(:next_packet).with(:nonblock, 7).returns(nil)
      session.poll_message
    end

    def test_poll_message_should_default_to_non_blocking
      session!
      socket.expects(:next_packet).with(:nonblock, nil).returns(nil)
      session.poll_message
    end

    def test_poll_message_should_silently_handle_disconnect_packets
      session!
      socket.expects(:next_packet).returns(P(:byte, DISCONNECT, :long, 1, :string, "testing", :string, ""))
      assert_raises(Net::SSH::Disconnect) { session.poll_message }
    end

    def test_poll_message_should_silently_handle_ignore_packets
      session!
      socket.expects(:next_packet).times(2).returns(P(:byte, IGNORE, :string, "test"), nil)
      assert_nil session.poll_message
    end

    def test_poll_message_should_silently_handle_unimplemented_packets
      session!
      socket.expects(:next_packet).times(2).returns(P(:byte, UNIMPLEMENTED, :long, 15), nil)
      assert_nil session.poll_message
    end

    def test_poll_message_should_silently_handle_debug_packets_with_always_display
      session!
      socket.expects(:next_packet).times(2).returns(P(:byte, DEBUG, :bool, true, :string, "testing", :string, ""), nil)
      assert_nil session.poll_message
    end

    def test_poll_message_should_silently_handle_debug_packets_without_always_display
      session!
      socket.expects(:next_packet).times(2).returns(P(:byte, DEBUG, :bool, false, :string, "testing", :string, ""), nil)
      assert_nil session.poll_message
    end

    def test_poll_message_should_silently_handle_kexinit_packets
      session!
      packet = P(:byte, KEXINIT, :raw, "lasdfalksdjfa;slkdfja;slkfjsdfaklsjdfa;df")
      socket.expects(:next_packet).times(2).returns(packet, nil)
      algorithms.expects(:accept_kexinit).with(packet)
      assert_nil session.poll_message
    end

    def test_poll_message_should_return_other_packets
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      socket.expects(:next_packet).returns(packet)
      assert_equal packet, session.poll_message
    end

    def test_poll_message_should_enqueue_packets_when_algorithm_disallows_packet
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      algorithms.stubs(:allow?).with(packet).returns(false)
      socket.expects(:next_packet).times(2).returns(packet, nil)
      assert_nil session.poll_message
      assert_equal [packet], session.queue
    end

    def test_poll_message_should_read_from_queue_when_next_in_queue_is_allowed_and_consume_queue_is_true
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      session.push(packet)
      socket.expects(:next_packet).never
      assert_equal packet, session.poll_message
      assert session.queue.empty?
    end

    def test_poll_message_should_not_read_from_queue_when_next_in_queue_is_not_allowed
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      algorithms.stubs(:allow?).with(packet).returns(false)
      session.push(packet)
      socket.expects(:next_packet).returns(nil)
      assert_nil session.poll_message
      assert_equal [packet], session.queue
    end

    def test_poll_message_should_not_read_from_queue_when_consume_queue_is_false
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      session.push(packet)
      socket.expects(:next_packet).returns(nil)
      assert_nil session.poll_message(:nonblock, false)
      assert_equal [packet], session.queue
    end

    def test_wait_with_block_should_return_immediately_if_block_returns_truth
      session.expects(:poll_message).never
      session.wait { true }
    end

    def test_wait_should_not_consume_queue_on_reads
      n = 0
      session.expects(:poll_message).with(:nonblock, false).returns(nil)
      session.wait { (n += 1) > 1 }
    end

    def test_wait_without_block_should_return_after_first_read
      session.expects(:poll_message).returns(nil)
      session.wait
    end

    def test_wait_should_enqueue_packets
      session!

      p1 = P(:byte, SERVICE_REQUEST, :string, "test")
      p2 = P(:byte, SERVICE_ACCEPT, :string, "test")
      socket.expects(:next_packet).times(2).returns(p1, p2)

      n = 0
      session.wait { (n += 1) > 2 }
      assert_equal [p1, p2], session.queue
    end

    def test_push_should_enqueue_packet
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      session.push(packet)
      assert_equal [packet], session.queue
    end

    def test_send_message_should_delegate_to_socket
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      socket.expects(:send_packet).with(packet)
      session.send_message(packet)
    end

    def test_enqueue_message_should_delegate_to_socket
      session!
      packet = P(:byte, SERVICE_ACCEPT, :string, "test")
      socket.expects(:enqueue_packet).with(packet)
      session.enqueue_message(packet)
    end

    def test_configure_client_should_pass_options_to_socket_client_state
      session.configure_client compression: :standard
      assert_equal :standard, socket.client.compression
    end

    def test_configure_server_should_pass_options_to_socket_server_state
      session.configure_server compression: :standard
      assert_equal :standard, socket.server.compression
    end

    def test_hint_should_set_hint_on_socket
      assert !socket.hints[:authenticated]
      session.hint :authenticated
      assert socket.hints[:authenticated]
    end

    class TestLogger < Logger
      def initialize
        @strio = StringIO.new
        super(@strio)
      end

      def messages
        @strio.string
      end
    end

    def test_log_correct_debug_with_proxy
      logger = TestLogger.new
      proxy = Net::SSH::Proxy::HTTP.new("")

      session!(logger: logger, proxy: proxy)
      assert_match "establishing connection to #{TEST_HOST}:#{TEST_PORT} through proxy", logger.messages
    end

    def test_log_correct_debug_without_proxy
      logger = TestLogger.new

      session!(logger: logger)
      assert_match "establishing connection to #{TEST_HOST}:#{TEST_PORT}", logger.messages
    end

    private

    def socket
      @socket ||= stub("socket", hints: {})
    end

    def server_version
      @server_version ||= stub("server_version")
    end

    def algorithms
      @algorithms ||= stub("algorithms", initialized?: true, allow?: true, start: true)
    end

    def session(options = {})
      @session ||= begin
        host = options.delete(:host) || TEST_HOST
        if (proxy = options[:proxy])
          proxy.stubs("open").returns(socket)
        else
          Socket.stubs(:tcp).with(host, options[:port] || TEST_PORT, nil, nil, connect_timeout: options[:timeout]).returns(socket)
        end
        Net::SSH::Transport::ServerVersion.stubs(:new).returns(server_version)
        Net::SSH::Transport::Algorithms.stubs(:new).returns(algorithms)

        Net::SSH::Transport::Session.new(host, options)
      end
    end
    # a simple alias to make the tests more self-documenting. the bang
    # version makes it look more like the session is being instantiated
    alias session! session
  end
end
